{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "e9ae4c8b-4599-4fbb-a545-76b6e3bcb84d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Physical Plan ==\n",
      "AdaptiveSparkPlan isFinalPlan=false\n",
      "+- ObjectHashAggregate(keys=[device_id#598, device_type#601], functions=[collect_list(user_id#568, 0, 0)])\n",
      "   +- ObjectHashAggregate(keys=[device_id#598, device_type#601], functions=[partial_collect_list(user_id#568, 0, 0)])\n",
      "      +- Project [device_id#598, device_type#601, user_id#568]\n",
      "         +- SortMergeJoin [device_id#598], [device_id#569], Inner\n",
      "            :- Sort [device_id#598 ASC NULLS FIRST], false, 0\n",
      "            :  +- Exchange hashpartitioning(device_id#598, 4), ENSURE_REQUIREMENTS, [plan_id=735]\n",
      "            :     +- Filter isnotnull(device_id#598)\n",
      "            :        +- FileScan csv [device_id#598,device_type#601] Batched: false, DataFilters: [isnotnull(device_id#598)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/devices.csv], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:int,device_type:string>\n",
      "            +- Sort [device_id#569 ASC NULLS FIRST], false, 0\n",
      "               +- Exchange hashpartitioning(device_id#569, 4), ENSURE_REQUIREMENTS, [plan_id=736]\n",
      "                  +- Filter isnotnull(device_id#569)\n",
      "                     +- InMemoryTableScan [user_id#568, device_id#569], [isnotnull(device_id#569)]\n",
      "                           +- InMemoryRelation [user_id#568, device_id#569, event_counts#606L, host_array#607], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "                                 +- AdaptiveSparkPlan isFinalPlan=false\n",
      "                                    +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[count(1), collect_list(distinct host#20, 0, 0)])\n",
      "                                       +- Exchange hashpartitioning(user_id#17, device_id#18, 4), ENSURE_REQUIREMENTS, [plan_id=752]\n",
      "                                          +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[merge_count(1), partial_collect_list(distinct host#20, 0, 0)])\n",
      "                                             +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[merge_count(1)])\n",
      "                                                +- Exchange hashpartitioning(user_id#17, device_id#18, host#20, 4), ENSURE_REQUIREMENTS, [plan_id=748]\n",
      "                                                   +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[partial_count(1)])\n",
      "                                                      +- Filter isnotnull(user_id#17)\n",
      "                                                         +- FileScan csv [user_id#17,device_id#18,host#20] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n",
      "\n",
      "\n",
      "== Physical Plan ==\n",
      "AdaptiveSparkPlan isFinalPlan=false\n",
      "+- ObjectHashAggregate(keys=[user_id#568], functions=[max(event_counts#606L), collect_list(device_id#569, 0, 0)])\n",
      "   +- ObjectHashAggregate(keys=[user_id#568], functions=[partial_max(event_counts#606L), partial_collect_list(device_id#569, 0, 0)])\n",
      "      +- Project [user_id#568, device_id#569, event_counts#606L]\n",
      "         +- SortMergeJoin [user_id#568], [user_id#614], Inner\n",
      "            :- Sort [user_id#568 ASC NULLS FIRST], false, 0\n",
      "            :  +- Exchange hashpartitioning(user_id#568, 4), ENSURE_REQUIREMENTS, [plan_id=788]\n",
      "            :     +- Filter isnotnull(user_id#568)\n",
      "            :        +- FileScan csv [user_id#568,device_id#569] Batched: false, DataFilters: [isnotnull(user_id#568)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int>\n",
      "            +- Sort [user_id#614 ASC NULLS FIRST], false, 0\n",
      "               +- Exchange hashpartitioning(user_id#614, 4), ENSURE_REQUIREMENTS, [plan_id=789]\n",
      "                  +- Filter isnotnull(user_id#614)\n",
      "                     +- InMemoryTableScan [user_id#614, event_counts#606L], [isnotnull(user_id#614)]\n",
      "                           +- InMemoryRelation [user_id#614, device_id#615, event_counts#606L, host_array#607], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "                                 +- AdaptiveSparkPlan isFinalPlan=false\n",
      "                                    +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[count(1), collect_list(distinct host#20, 0, 0)])\n",
      "                                       +- Exchange hashpartitioning(user_id#17, device_id#18, 4), ENSURE_REQUIREMENTS, [plan_id=805]\n",
      "                                          +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[merge_count(1), partial_collect_list(distinct host#20, 0, 0)])\n",
      "                                             +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[merge_count(1)])\n",
      "                                                +- Exchange hashpartitioning(user_id#17, device_id#18, host#20, 4), ENSURE_REQUIREMENTS, [plan_id=801]\n",
      "                                                   +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[partial_count(1)])\n",
      "                                                      +- Filter isnotnull(user_id#17)\n",
      "                                                         +- FileScan csv [user_id#17,device_id#18,host#20] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n",
      "\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "import org.apache.spark.sql.functions._\n",
       "import org.apache.spark.storage.StorageLevel\n",
       "users: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, device_id: int ... 4 more fields]\n",
       "devices: org.apache.spark.sql.DataFrame = [device_id: int, browser_type: string ... 2 more fields]\n",
       "executionDate: String = 2023-01-01\n",
       "eventsAggregated: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, device_id: int ... 2 more fields]\n",
       "usersAndDevices: org.apache.spark.sql.DataFrame = [user_id: int, user_id: int ... 2 more fields]\n",
       "devicesOnEvents: org.apache.spark.sql.DataFrame = [device_id: int, device_type: string ... 3 more fields]\n",
       "res1: Array[org.apache.spark.sql.Row] = Array([-2147470439,-2147470439,3,WrappedArray(378988111, 378988111, 378988111)])\n"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.storage.StorageLevel\n",
    "\n",
    "spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\")\n",
    "spark.conf.set(\"spark.sql.analyzer.failAmbiguousSelfJoin\", \"false\")\n",
    "spark.conf.set(\"spark.sql.shuffle.partitions\", \"4\")\n",
    "\n",
    "val users = spark.read.option(\"header\", \"true\")\n",
    "                        .option(\"inferSchema\", \"true\")\n",
    "                        .csv(\"/home/iceberg/data/events.csv\")\n",
    "                        .where($\"user_id\".isNotNull)\n",
    "\n",
    "users.createOrReplaceTempView(\"events\")\n",
    "\n",
    "val devices = spark.read.option(\"header\", \"true\")\n",
    "                        .option(\"inferSchema\", \"true\")\n",
    "                        .csv(\"/home/iceberg/data/devices.csv\")\n",
    "\n",
    "devices.createOrReplaceTempView(\"devices\")\n",
    "\n",
    "val executionDate = \"2023-01-01\"\n",
    "\n",
    "//Caching here should be < 5 GBs or used for broadcast join\n",
    "//You need to tune executor memory otherwise it'll spill to disk and be slow\n",
    "//Don't really try using any of the other StorageLevel besides MEMORY_ONLY\n",
    "\n",
    "val eventsAggregated = spark.sql(f\"\"\"\n",
    "  SELECT user_id, \n",
    "          device_id, \n",
    "        COUNT(1) as event_counts, \n",
    "        COLLECT_LIST(DISTINCT host) as host_array\n",
    "  FROM events\n",
    "  GROUP BY 1,2\n",
    "\"\"\").cache()\n",
    "\n",
    "// eventsAggregated.write.mode(\"overwrite\").saveAsTable(\"bootcamp.events_aggregated_staging\")\n",
    "\n",
    "spark.sql(f\"\"\"\n",
    "    CREATE TABLE IF NOT EXISTS bootcamp.events_aggregated_staging (\n",
    "        user_id BIGINT,\n",
    "        device_id BIGINT,\n",
    "        event_counts BIGINT,\n",
    "        host_array ARRAY<STRING>\n",
    "    )\n",
    "    PARTITIONED BY (ds STRING)\n",
    "\"\"\")\n",
    "\n",
    "\n",
    "val usersAndDevices = users\n",
    "  .join(eventsAggregated, eventsAggregated(\"user_id\") === users(\"user_id\"))\n",
    "  .groupBy(users(\"user_id\"))\n",
    "  .agg(\n",
    "    users(\"user_id\"),\n",
    "    max(eventsAggregated(\"event_counts\")).as(\"total_hits\"),\n",
    "    collect_list(eventsAggregated(\"device_id\")).as(\"devices\")\n",
    "  )\n",
    "\n",
    "val devicesOnEvents = devices\n",
    "      .join(eventsAggregated, devices(\"device_id\") === eventsAggregated(\"device_id\"))\n",
    "      .groupBy(devices(\"device_id\"), devices(\"device_type\"))\n",
    "      .agg(\n",
    "        devices(\"device_id\"),\n",
    "        devices(\"device_type\"),\n",
    "         collect_list(eventsAggregated(\"user_id\")).as(\"users\")\n",
    "      )\n",
    "\n",
    "devicesOnEvents.explain()\n",
    "usersAndDevices.explain()\n",
    "\n",
    "devicesOnEvents.take(1)\n",
    "usersAndDevices.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "c36feefd-a8cd-4749-81fd-8f7883349ba1",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "res18: eventsAggregated.type = [user_id: int, device_id: int ... 2 more fields]\n"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "eventsAggregated.unpersist()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "02886a49-4dc5-4e00-9cad-20926166f388",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "spylon-kernel",
   "language": "scala",
   "name": "spylon-kernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "help_links": [
    {
     "text": "MetaKernel Magics",
     "url": "https://metakernel.readthedocs.io/en/latest/source/README.html"
    }
   ],
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "0.4.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
